Lab 6 - Denoising

Part 1. Cleaning up the sounds

Let’s start by cleaning up these sounds as best as we can. We will do a straightforward magnitude spectral subtraction. For all of these sounds there is only noise in the first few seconds of the recording so that you can learn a noise profile from there. Do the following: Perform an STFT of the recordings

  • Estimate the magnitude spectrum of the noise from the beginning of the recording

    • It’s up to you to figure out how many seconds to use (hint: look at the spectrogram)
  • Perform spectral subtraction by subtracting that spectrum from the input’s magnitude STFT

    • Remember to clip any resulting negative values to zero
    • Try to find how much of the noise to subtract so that the output looks good
  • Use the original signal’s phase to convert back to a time series.

Make a note of which examples sound the best and are easier to work with. Try to explain why. At this point some of the outputs will exhibit “musical noise”. To minimize its effects apply a median filter on the denoised magnitude spectrogram to make it sound better (hint: scipy.signal.medfilt2). How big should the median window be? Try different values and find which work best.

In [12]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.io.wavfile import read
from scipy import signal
from copy import deepcopy
import math

def sound( x, rate=8000, label=''):
    from IPython.display import display, Audio, HTML
    if label is '':
        display( Audio( x, rate=rate))
    else:
        display( HTML( 
        '<style> table, th, td {border: 0px; }</style> <table><tr><td>' + label + 
        '</td><td>' + Audio( x, rate=rate)._repr_html_()[3:] + '</td></tr></table>'
        ))
        
def stft(input_sound, dft_size, hop_size, zero_pad, window):
    result = []
    for i in range(0, len(input_sound), hop_size):
        slot = []
        start = i
        end = i + dft_size
        if end >= len(input_sound):
            slot = input_sound[start:]
            zero = np.array([0] * (end - len(input_sound)))
            slot = np.concatenate([slot, zero])
        else:
            slot = input_sound[start:end]
        result.append(np.multiply(np.fft.rfft(np.concatenate([np.multiply(slot, window), np.array(zero_pad)])), (hop_size * 2 / dft_size)))
    return result

def show_graph(sub_fig, freq_data, amp_data_len, sample_rate, mode="spectrogram", title=""):
    if mode == "spectrogram":
        transposed_data = np.log(np.absolute(freq_data).real).T
        t1 = np.linspace(0, amp_data_len / sample_rate, len(transposed_data[0]))
        t2 = np.linspace(0, sample_rate / 2, len(transposed_data))
        x1, x2 = np.meshgrid(t1, t2)
        sub_fig.pcolormesh(x1, x2, np.array(transposed_data))
        sub_fig.set_title(title)
        
def get_bg_amp(input_sound, dft_size, hop_size, zero_pad, window):
    spectrogram = np.abs(stft(input_sound, dft_size, hop_size, zero_pad, window))
    return np.mean(spectrogram, axis=0)

def spec_subtract(alpha, med_kern_size, bg_amp, spectrogram):
    result = np.zeros(np.array(spectrogram).shape, dtype=complex)
    multiplier = np.zeros(np.array(spectrogram).shape, dtype=float)
    for j in range(len(spectrogram)):
        for i in range(len(spectrogram[j])):
            multiplier[j][i] = max((abs(spectrogram[j][i]) - abs(bg_amp[i]) * alpha) / abs(spectrogram[j][i]), 0.0)
    result = spectrogram * np.add(signal.medfilt2d(multiplier, kernel_size=med_kern_size), 0.00000001)
            
    return result
    
def istft(stft_output, dft_size, hop_size, zero_pad, window):
    # YOUR CODE HERE
    result = np.array([0] * (hop_size * len(stft_output) - hop_size + dft_size), dtype=float)
    for i in range(len(stft_output)):
        each_wave = np.fft.irfft(stft_output[i], n=dft_size + len(zero_pad))[:dft_size]
        result[(i * hop_size):(i * hop_size + dft_size)] = np.add(result[(i * hop_size):(i * hop_size + dft_size)], np.multiply(np.multiply(each_wave, window), (0.2)))
    return result[:hop_size * len(stft_output)]
    
        
sr_ws, data_ws = read("wind-speech.wav")
sr_ac, data_ac = read("aircomm.wav")
sr_rs, data_rs = read("room-speech.wav")

dft_size = 512
hop_size = 128
window = window = signal.hann(dft_size)
zero_pad = []

spec_ws = stft(data_ws, dft_size, hop_size, zero_pad, window);
spec_ac = stft(data_ac, dft_size, hop_size, zero_pad, window);
spec_rs = stft(data_rs, dft_size, hop_size, zero_pad, window);

bg_ws = data_ws[:int(sr_ws * 2)]
bg_ac = data_ac[:int(sr_ac * 0.4)]
bg_rs = data_rs[:int(sr_rs * 2)]

bg_amp_ws = get_bg_amp(bg_ws, dft_size, hop_size, zero_pad, window)
bg_amp_ac = get_bg_amp(bg_ac, dft_size, hop_size, zero_pad, window)
bg_amp_rs = get_bg_amp(bg_rs, dft_size, hop_size, zero_pad, window)

spec_ws_denoised = spec_subtract(1.5, 1, bg_amp_ws, spec_ws)
spec_ac_denoised = spec_subtract(2.0, 1, bg_amp_ac, spec_ac)
spec_rs_denoised = spec_subtract(1.5, 1, bg_amp_rs, spec_rs)
spec_ws_denoised_smooth_3 = spec_subtract(1.5, 3, bg_amp_ws, spec_ws)
spec_ac_denoised_smooth_3 = spec_subtract(2.0, 3, bg_amp_ac, spec_ac)
spec_rs_denoised_smooth_3 = spec_subtract(1.5, 3, bg_amp_rs, spec_rs)
spec_ws_denoised_smooth_5 = spec_subtract(1.5, 5, bg_amp_ws, spec_ws)
spec_ac_denoised_smooth_5 = spec_subtract(2.0, 5, bg_amp_ac, spec_ac)
spec_rs_denoised_smooth_5 = spec_subtract(1.5, 5, bg_amp_rs, spec_rs)

data_ws_denoised = istft(spec_ws_denoised, dft_size, hop_size, zero_pad, window)
data_ac_denoised = istft(spec_ac_denoised, dft_size, hop_size, zero_pad, window)
data_rs_denoised = istft(spec_rs_denoised, dft_size, hop_size, zero_pad, window)
data_ws_denoised_smooth_3 = istft(spec_ws_denoised_smooth_3, dft_size, hop_size, zero_pad, window)
data_ac_denoised_smooth_3 = istft(spec_ac_denoised_smooth_3, dft_size, hop_size, zero_pad, window)
data_rs_denoised_smooth_3 = istft(spec_rs_denoised_smooth_3, dft_size, hop_size, zero_pad, window)
data_ws_denoised_smooth_5 = istft(spec_ws_denoised_smooth_5, dft_size, hop_size, zero_pad, window)
data_ac_denoised_smooth_5 = istft(spec_ac_denoised_smooth_5, dft_size, hop_size, zero_pad, window)
data_rs_denoised_smooth_5 = istft(spec_rs_denoised_smooth_5, dft_size, hop_size, zero_pad, window)

print("wind speech:")
sound(data_ws, rate=sr_ws, label='wind-speech original')
sound(data_ws_denoised, rate=sr_ws, label='wind-speech denoised')
sound(data_ws_denoised_smooth_3, rate=sr_ws, label='wind-speech denoised & smoothed 3')
sound(data_ws_denoised_smooth_5, rate=sr_ws, label='wind-speech denoised & smoothed 5')
print("aircomm:")
sound(data_ac, rate=sr_ac, label='aircomm original')
sound(data_ac_denoised, rate=sr_ac, label='aircomm denoised')
sound(data_ac_denoised_smooth_3, rate=sr_ac, label='aircomm denoised & smoothed 3')
sound(data_ac_denoised_smooth_5, rate=sr_ac, label='aircomm denoised & smoothed 5')
print("room speech:")
sound(data_rs, rate=sr_rs, label='room-speech original')
sound(data_rs_denoised, rate=sr_rs, label='room-speech denoised')
sound(data_rs_denoised_smooth_3, rate=sr_rs, label='room-speech denoised & smoothed 3')
sound(data_rs_denoised_smooth_5, rate=sr_rs, label='room-speech denoised & smoothed 5')


fig, (fig_spec_ws, fig_spec_ws_denoised, fig_spec_ws_denoised_smooth_3, fig_spec_ws_denoised_smooth_5, 
      fig_spec_ac, fig_spec_ac_denoised, fig_spec_ac_denoised_smooth_3, fig_spec_ac_denoised_smooth_5, 
      fig_spec_rs, fig_spec_rs_denoised, fig_spec_rs_denoised_smooth_3, fig_spec_rs_denoised_smooth_5) = plt.subplots(nrows=12)

show_graph(fig_spec_ws, spec_ws, len(data_ws), sr_ws, "spectrogram", "wind-speech origin")
show_graph(fig_spec_ws_denoised, spec_ws_denoised, len(data_ws), sr_ws, "spectrogram", "wind-speech denoised")
show_graph(fig_spec_ws_denoised_smooth_3, spec_ws_denoised_smooth_3, len(data_ws), sr_ws, "spectrogram", "wind-speech denoised & smoothed 3")
show_graph(fig_spec_ws_denoised_smooth_5, spec_ws_denoised_smooth_5, len(data_ws), sr_ws, "spectrogram", "wind-speech denoised & smoothed 5")
show_graph(fig_spec_ac, spec_ac, len(data_ac), sr_ac, "spectrogram", "aircomm origin")
show_graph(fig_spec_ac_denoised, spec_ac_denoised, len(data_ac), sr_ac, "spectrogram", "aircomm denoised")
show_graph(fig_spec_ac_denoised_smooth_3, spec_ac_denoised_smooth_3, len(data_ac), sr_ac, "spectrogram", "aircomm denoised & smoothed 3")
show_graph(fig_spec_ac_denoised_smooth_5, spec_ac_denoised_smooth_5, len(data_ac), sr_ac, "spectrogram", "aircomm denoised & smoothed 5")
show_graph(fig_spec_rs, spec_rs, len(data_rs), sr_rs, "spectrogram", "room-speech origin")
show_graph(fig_spec_rs_denoised, spec_rs_denoised, len(data_rs), sr_rs, "spectrogram", "room-speech denoised")
show_graph(fig_spec_rs_denoised_smooth_3, spec_rs_denoised_smooth_3, len(data_rs), sr_rs, "spectrogram", "room-speech denoised & smoothed 3")
show_graph(fig_spec_rs_denoised_smooth_5, spec_rs_denoised_smooth_5, len(data_rs), sr_rs, "spectrogram", "room-speech denoised & smoothed 5")

fig.set_size_inches(6, 48)
fig.tight_layout()
plt.show()


        
wind speech:
wind-speech original
wind-speech denoised
wind-speech denoised & smoothed 3
wind-speech denoised & smoothed 5
aircomm:
aircomm original
aircomm denoised
aircomm denoised & smoothed 3
aircomm denoised & smoothed 5
room speech:
room-speech original
room-speech denoised
room-speech denoised & smoothed 3
room-speech denoised & smoothed 5

The easiest sample to be denoised is "room-speech" because it's background noise has less fluctuation than other two samples. "Wind-speech" has gradually increasing wind noise so some noise is still detectable at middle part after applying subtraction. The noise of "aircomm"has much bigger amplitude and short time fluctuation than the sound we want to extract, so the musical noise is very noticable. With applying median filter, set kernel size to 3 is high enough for "wind-speech" and "room-speech" to eliminate most musical noise except "aircomm", which should set the size as 5. But in this case, the sound we want to extract is seriously degraded, so we still cannot know what the person is talking about in "aircomm."

Part 2. Implement a Voice Activity Detector (VAD)

For the last sound we have an evolving noise profile, which causes problems since our noise description from the first two seconds isn’t accurate throughout. Because we’re lazy we want to automatically update the noise model and not to select it manually. To do so we need a VAD that lets us know when to gather noise statistics and when to denoise. Do the following:

  • Take the square of the input waveform and lowpass filter it (a lot) to get an energy level over time
    • Experiment with the cutoff frequency so that you get a smooth energy-looking function
  • Set a threshold over which we seem to have speech in the input
  • Implement a real-time spectral subtraction denoiser
    • If an input frame is under threshold, it is noise
    • Keep track of the last few noise frames and their average amplitude will be your noise spectrum
    • If an input is over the threshold it is speech
    • Once you encounter speech perform spectral subtraction with the current noise estimate
In [41]:
def spec_dynamic_subtract(alpha, med_kern_size, speech_or_noise, spectrogram):
    result = np.zeros(np.array(spectrogram).shape, dtype=complex)
    multiplier = np.zeros(np.array(spectrogram).shape, dtype=float)
    noise = []
    
    for j in range(len(spectrogram)):
        if speech_or_noise[j] == 0:
            noise.append(spectrogram[j])
            if len(noise) > 10:
                noise.pop(0)
        bg_amp = np.mean(np.abs(np.array(noise)), axis=0)
        for i in range(len(spectrogram[j])): 
            multiplier[j][i] = max((abs(spectrogram[j][i]) - abs(bg_amp[i]) * alpha) / abs(spectrogram[j][i]), 0.0)
            
    return spectrogram * np.add(signal.medfilt2d(multiplier, kernel_size=med_kern_size), 0.00000001)


spec_ws = stft(data_ws, dft_size, hop_size, zero_pad, window);

lowpass = signal.firwin(1024, 3000, fs=sr_ws)
squared_waveform = np.multiply(data_ws, data_ws)
filtered_data_ws = np.convolve(lowpass, squared_waveform, mode='same')

threshold = 0.02
speech_or_noise = []

for i in range(0, len(spec_ws)):
    if max(np.concatenate([squared_waveform, np.array([0] * 512)])[(i * 128):(i * 128) + 512]) > threshold:
        speech_or_noise.append(1)
    else:
        speech_or_noise.append(0)
        
spec_ws_denoised_dynamic = spec_dynamic_subtract(1.5, 1, speech_or_noise, spec_ws)
data_ws_denoised_dynamic = istft(spec_ws_denoised_dynamic, dft_size, hop_size, zero_pad, window)

sound(data_ws, rate=sr_ws, label='wind-speech original')
sound(data_ws_denoised, rate=sr_ws, label='wind-speech denoised static')
sound(data_ws_denoised_dynamic, rate=sr_ws, label='wind-speech denoised dynamic')

fig, (fig_energy, fig_spec_ws, fig_spec_static, fig_spec_dynamic) = plt.subplots(nrows=4)

fig_energy.plot(squared_waveform)
fig_energy.set_title("energy")
show_graph(fig_spec_ws, spec_ws, len(data_ws), sr_ws, "spectrogram", "wind-speech origin")
show_graph(fig_spec_static, spec_ws_denoised, len(data_ws), sr_ws, "spectrogram", "wind-speech denoised static")
show_graph(fig_spec_dynamic, spec_ws_denoised_dynamic, len(data_ws), sr_ws, "spectrogram", "wind-speech denoised dynamic")

fig.set_size_inches(6, 16)
fig.tight_layout()
plt.show()
wind-speech original
wind-speech denoised static
wind-speech denoised dynamic
In [ ]: